Skip to content

LCORE-2311: Relocated stream serialization utilities#1910

Merged
tisnik merged 1 commit into
lightspeed-core:mainfrom
asimurka:relocate_streaming_utils
Jun 12, 2026
Merged

LCORE-2311: Relocated stream serialization utilities#1910
tisnik merged 1 commit into
lightspeed-core:mainfrom
asimurka:relocate_streaming_utils

Conversation

@asimurka

@asimurka asimurka commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Description

Relocating utility function for stream serialization to a separate module.

Type of change

  • Refactor
  • New feature
  • Bug fix
  • CVE fix
  • Optimization
  • Documentation Update
  • Configuration Update
  • Bump-up service version
  • Bump-up dependent library
  • Bump-up library or tool used for development (does not change the final image)
  • CI configuration change
  • Konflux configuration change
  • Unit tests improvement
  • Integration tests improvement
  • End to end tests improvement
  • Benchmarks improvement

Tools used to create PR

Identify any AI code assistants used in this PR (for transparency and review context)

  • Assisted-by: (e.g., Claude, CodeRabbit, Ollama, etc., N/A if not used)
  • Generated by: (e.g., tool name and version; N/A if not used)

Related Tickets & Documents

Checklist before requesting a review

  • I have performed a self-review of my code.
  • PR has passed all pre-merge test jobs.
  • If it is a core feature, I have added thorough tests.

Testing

  • Please provide detailed steps to perform tests related to this code change.
  • How were the fix/results from this change verified? Please provide relevant screenshots or results.

Summary by CodeRabbit

  • Refactor

    • Consolidated streaming event utilities into a shared module for improved code organization and maintainability.
  • Tests

    • Added comprehensive test coverage for streaming response formatting across JSON and text modes.

@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 8134a3fb-e4d1-4ced-b481-72023544d51e

📥 Commits

Reviewing files that changed from the base of the PR and between a6cd9eb and c417e0d.

📒 Files selected for processing (4)
  • src/app/endpoints/streaming_query.py
  • src/utils/streaming_sse.py
  • tests/unit/app/endpoints/test_streaming_query.py
  • tests/unit/utils/test_streaming_sse.py
📜 Recent review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-0-6-on-pull-request
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
🧰 Additional context used
📓 Path-based instructions (3)
tests/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

tests/**/*.py: Use pytest for all unit and integration tests; do not use unittest
Use pytest.mark.asyncio marker for async tests

Files:

  • tests/unit/utils/test_streaming_sse.py
  • tests/unit/app/endpoints/test_streaming_query.py
src/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/**/*.py: Use absolute imports for internal modules: from authentication import get_auth_dependency
Llama Stack imports: Use from llama_stack_client import AsyncLlamaStackClient
Check constants.py for shared constants before defining new ones
All modules must start with descriptive docstrings explaining purpose
Use logger = get_logger(__name__) from log.py for module logging
All functions must have complete type annotations for parameters and return types, use modern syntax (str | int), and include descriptive docstrings
Use snake_case with descriptive, action-oriented names for functions (get_, validate_, check_)
Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters
Use async def for I/O operations and external API calls
Use standard log levels with clear purposes: debug() for diagnostic info, info() for program execution, warning() for unexpected events, error() for serious problems
All classes must have descriptive docstrings explaining purpose and use PascalCase with standard suffixes: Configuration, Error/Exception, Resolver, Interface
Abstract classes must use ABC with @abstractmethod decorators
Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes

Files:

  • src/utils/streaming_sse.py
  • src/app/endpoints/streaming_query.py
src/app/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/app/**/*.py: FastAPI dependencies: Import from fastapi module for APIRouter, HTTPException, Request, status, Depends
Use FastAPI HTTPException with appropriate status codes for API endpoints and handle APIConnectionError from Llama Stack

Files:

  • src/app/endpoints/streaming_query.py
🧠 Learnings (1)
📚 Learning: 2026-04-06T20:18:07.852Z
Learnt from: major
Repo: lightspeed-core/lightspeed-stack PR: 1463
File: src/app/endpoints/rlsapi_v1.py:266-271
Timestamp: 2026-04-06T20:18:07.852Z
Learning: In the lightspeed-stack codebase, within `src/app/endpoints/` inference/MCP endpoints, treat `tools: Optional[list[Any]]` in MCP tool definitions as an intentional, consistent typing pattern (used across `query`, `responses`, `streaming_query`, `rlsapi_v1`). Do not raise or suggest this as a typing issue during code review; changing it in isolation could break endpoint typing consistency across the codebase.

Applied to files:

  • src/app/endpoints/streaming_query.py
🪛 ast-grep (0.43.0)
src/utils/streaming_sse.py

[info] 65-65: use jsonify instead of json.dumps for JSON output
Context: json.dumps(d)
Note: Security best practice.

(use-jsonify)

🔇 Additional comments (19)
src/app/endpoints/streaming_query.py (4)

683-685: Missing media_type parameter in http_exception_stream_event call.

This issue was already identified in a previous review. The http_exception_stream_event(e) call does not pass the negotiated media_type, which is available at line 652. For MEDIA_TYPE_TEXT requests, this could cause compaction-path errors to return a different shape than other error paths (lines 692, 695-698, 701-703) which all pass media_type to stream_http_error_event.


79-79: LGTM!


126-135: LGTM!


1096-1096: LGTM!

tests/unit/app/endpoints/test_streaming_query.py (1)

113-132: LGTM!

src/utils/streaming_sse.py (9)

27-29: Prefer modern union syntax for optional parameters.

This issue has already been flagged in a previous review.


1-25: LGTM!


30-54: LGTM!


57-68: LGTM!


70-139: LGTM!


142-182: LGTM!


185-212: LGTM!


215-233: LGTM!


236-259: LGTM!

tests/unit/utils/test_streaming_sse.py (5)

1-26: LGTM!


28-104: LGTM!


106-204: LGTM!


206-239: LGTM!


241-278: LGTM!


Walkthrough

This PR extracts Server-Sent Events (SSE) formatting utilities and shield-violation generation from the streaming_query endpoint module into a new reusable utils/streaming_sse.py module, updates the endpoint to import these utilities, and reorganizes test coverage accordingly.

Changes

Streaming SSE Utilities Extraction and Refactoring

Layer / File(s) Summary
SSE utilities module
src/utils/streaming_sse.py
New module provides SSE event formatting helpers for streaming responses, including error rendering with text/JSON modes, lifecycle event markers (start/compaction/interruption/end), per-event payload serialization with media-type branching, and async shield-violation token generation.
SSE utilities test coverage
tests/unit/utils/test_streaming_sse.py
Comprehensive unit test coverage for SSE event formatting across JSON and text media types, including assertions for stream lifecycle events, per-event serialization, error rendering, and async shield-violation generation.
Endpoint refactored to use utilities
src/app/endpoints/streaming_query.py
Removes local SSE implementations, imports the new utilities from utils.streaming_sse, removes unused ReferencedDocument import, and delegates error and event rendering to the shared utility functions.
Endpoint tests simplified
tests/unit/app/endpoints/test_streaming_query.py
Removes unit tests for SSE helpers now covered by the utilities test module, consolidates OLS compatibility checks, and drops cancellation/interrupt behavior tests that validated implementation details now owned by the utilities.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • lightspeed-core/lightspeed-stack#1796: Related PR that originally introduced stream_compaction_event and other SSE helpers; this PR consolidates those helpers into the new shared streaming_sse module.

Suggested reviewers

  • tisnik
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: relocating stream serialization utilities from a single module to a dedicated shared module.
Docstring Coverage ✅ Passed Docstring coverage is 82.46% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@asimurka asimurka requested a review from tisnik June 11, 2026 16:03
@asimurka asimurka marked this pull request as draft June 11, 2026 16:10

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tests/unit/utils/test_streaming_sse.py (1)

268-278: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add content assertion to match the JSON test's thoroughness.

The JSON test at line 266 verifies that the violation message appears in the result, but this text test only checks that items were generated. For consistency and completeness, add a similar content assertion.

🧪 Proposed fix
         assert len(result) > 0
+        assert any("Violation message" in item for item in result)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/utils/test_streaming_sse.py` around lines 268 - 278, The test for
text media type currently only checks that shield_violation_generator produced
items; update the test_shield_violation_generator_text to also assert that the
generated content contains the expected violation message ("Violation message").
Locate the async test_shield_violation_generator_text and after collecting
result from shield_violation_generator(..., MEDIA_TYPE_TEXT) add a content
assertion (e.g., join or inspect result items) to confirm the string "Violation
message" appears, mirroring the JSON test's thoroughness.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/app/endpoints/streaming_query.py`:
- Around line 483-485: The compaction-path except block yields
http_exception_stream_event(e) without preserving the negotiated media type,
causing text-mode errors to lose plain-text formatting; update the except
handler in streaming_query.py to pass the negotiated media type into the
formatter (e.g., call http_exception_stream_event(e, media_type) or route this
branch through stream_http_error_event(e, media_type)) so MEDIA_TYPE_TEXT
requests get the same media-type-aware error shape as the non-compaction path.

In `@src/utils/stream_interrupt.py`:
- Around line 35-43: Update type hints and docstrings in
src/utils/stream_interrupt.py: replace all uses of Optional[ResponseInput] with
the modern union ResponseInput | None (including signatures of
persist_interrupted_turn and register_interrupt_callback) and remove the
Optional import; add Google-style docstring sections (Parameters, Returns,
Raises) for background_update_topic_summary,
shutdown_background_topic_summary_tasks, persist_interrupted_turn, and
register_interrupt_callback (ensure register_interrupt_callback documents the
Raises clause), and add a short docstring for the nested async def
on_interrupt() -> None explaining its purpose and exceptions; keep identifiers
intact (background_update_topic_summary,
shutdown_background_topic_summary_tasks, persist_interrupted_turn,
register_interrupt_callback, on_interrupt) so reviewers can locate the changes.

In `@src/utils/streaming_sse.py`:
- Around line 27-29: The type annotation for the media_type parameter in
stream_http_error_event uses Optional[str]; update it to the modern union syntax
str | None (i.e., change "media_type: Optional[str] = MEDIA_TYPE_JSON" to
"media_type: str | None = MEDIA_TYPE_JSON") and remove any now-unnecessary
Optional import if present; keep the function name stream_http_error_event and
default value MEDIA_TYPE_JSON unchanged.

In `@tests/unit/utils/test_stream_interrupt.py`:
- Around line 114-115: Replace the fragile fixed sleep (await
asyncio.sleep(0.01)) with an explicit completion mechanism: have the test wait
for the actual work to finish by either awaiting the spawned task, waiting on an
asyncio.Event set by the background callback, or draining
utils.stream_interrupt.background_topic_summary_tasks (e.g., await all tasks in
that set) so the assertion only runs after the consumer/callback has completed;
update the spots mentioned (around lines with await asyncio.sleep, and similarly
at the other locations noted) to use one of these explicit waits and ensure the
cancelled consumer task is awaited/joined instead of relying on time-based
sleeps.
- Around line 188-193: The test uses AsyncMock instances but asserts them with
sync assertions; change append_turn_mock.assert_called_once_with(...) to
await-based assertion append_turn_mock.assert_awaited_once_with(...) and
likewise change get_topic_summary_mock.assert_called_once_with(...) to
get_topic_summary_mock.assert_awaited_once_with(...). Also remove timing-based
await asyncio.sleep(0.1) and replace with deterministic synchronization: await
the spawned background task or wait on an asyncio.Event/Task.result() that the
test triggers so the AsyncMocks are guaranteed to have been awaited before
asserting.

---

Outside diff comments:
In `@tests/unit/utils/test_streaming_sse.py`:
- Around line 268-278: The test for text media type currently only checks that
shield_violation_generator produced items; update the
test_shield_violation_generator_text to also assert that the generated content
contains the expected violation message ("Violation message"). Locate the async
test_shield_violation_generator_text and after collecting result from
shield_violation_generator(..., MEDIA_TYPE_TEXT) add a content assertion (e.g.,
join or inspect result items) to confirm the string "Violation message" appears,
mirroring the JSON test's thoroughness.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 89bec62d-b14f-488a-a6d2-a07ef1281999

📥 Commits

Reviewing files that changed from the base of the PR and between 6116ef7 and a6cd9eb.

📒 Files selected for processing (7)
  • src/app/endpoints/streaming_query.py
  • src/app/main.py
  • src/utils/stream_interrupt.py
  • src/utils/streaming_sse.py
  • tests/unit/app/endpoints/test_streaming_query.py
  • tests/unit/utils/test_stream_interrupt.py
  • tests/unit/utils/test_streaming_sse.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
  • GitHub Check: integration_tests (3.13)
  • GitHub Check: integration_tests (3.12)
  • GitHub Check: list_outdated_dependencies
  • GitHub Check: build-pr
  • GitHub Check: E2E: library mode / ci / group 3
  • GitHub Check: unit_tests (3.13)
  • GitHub Check: unit_tests (3.12)
  • GitHub Check: E2E: library mode / ci / group 1
  • GitHub Check: E2E: server mode / ci / group 1
  • GitHub Check: E2E: server mode / ci / group 3
  • GitHub Check: E2E: server mode / ci / group 2
  • GitHub Check: E2E: library mode / ci / group 2
  • GitHub Check: Pylinter
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-0-6-on-pull-request
  • GitHub Check: Konflux kflux-prd-rh02 / lightspeed-stack-on-pull-request
🧰 Additional context used
📓 Path-based instructions (3)
src/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/**/*.py: Use absolute imports for internal modules: from authentication import get_auth_dependency
Llama Stack imports: Use from llama_stack_client import AsyncLlamaStackClient
Check constants.py for shared constants before defining new ones
All modules must start with descriptive docstrings explaining purpose
Use logger = get_logger(__name__) from log.py for module logging
All functions must have complete type annotations for parameters and return types, use modern syntax (str | int), and include descriptive docstrings
Use snake_case with descriptive, action-oriented names for functions (get_, validate_, check_)
Avoid in-place parameter modification anti-patterns; return new data structures instead of modifying function parameters
Use async def for I/O operations and external API calls
Use standard log levels with clear purposes: debug() for diagnostic info, info() for program execution, warning() for unexpected events, error() for serious problems
All classes must have descriptive docstrings explaining purpose and use PascalCase with standard suffixes: Configuration, Error/Exception, Resolver, Interface
Abstract classes must use ABC with @abstractmethod decorators
Follow Google Python docstring conventions with required sections: Parameters, Returns, Raises, and Attributes for classes

Files:

  • src/app/main.py
  • src/utils/stream_interrupt.py
  • src/utils/streaming_sse.py
  • src/app/endpoints/streaming_query.py
src/app/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

src/app/**/*.py: FastAPI dependencies: Import from fastapi module for APIRouter, HTTPException, Request, status, Depends
Use FastAPI HTTPException with appropriate status codes for API endpoints and handle APIConnectionError from Llama Stack

Files:

  • src/app/main.py
  • src/app/endpoints/streaming_query.py
tests/**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

tests/**/*.py: Use pytest for all unit and integration tests; do not use unittest
Use pytest.mark.asyncio marker for async tests

Files:

  • tests/unit/utils/test_streaming_sse.py
  • tests/unit/utils/test_stream_interrupt.py
  • tests/unit/app/endpoints/test_streaming_query.py
🧠 Learnings (1)
📚 Learning: 2026-04-06T20:18:07.852Z
Learnt from: major
Repo: lightspeed-core/lightspeed-stack PR: 1463
File: src/app/endpoints/rlsapi_v1.py:266-271
Timestamp: 2026-04-06T20:18:07.852Z
Learning: In the lightspeed-stack codebase, within `src/app/endpoints/` inference/MCP endpoints, treat `tools: Optional[list[Any]]` in MCP tool definitions as an intentional, consistent typing pattern (used across `query`, `responses`, `streaming_query`, `rlsapi_v1`). Do not raise or suggest this as a typing issue during code review; changing it in isolation could break endpoint typing consistency across the codebase.

Applied to files:

  • src/app/endpoints/streaming_query.py
🪛 ast-grep (0.43.0)
src/utils/streaming_sse.py

[info] 65-65: use jsonify instead of json.dumps for JSON output
Context: json.dumps(d)
Note: Security best practice.

(use-jsonify)

🔇 Additional comments (11)
src/utils/streaming_sse.py (6)

1-24: LGTM!


57-67: LGTM!


70-139: LGTM!


185-212: LGTM!


215-259: LGTM!


176-176: Keep the SSE "end" event truncated field as null. EndEventData.truncated is Optional[bool], the "end" payload is constructed with truncated=None, and unit tests plus docs/openapi.json expect "truncated": null for the end event—so it shouldn’t be removed.

tests/unit/utils/test_streaming_sse.py (5)

1-26: LGTM!


28-104: LGTM!


106-204: LGTM!


206-239: LGTM!


241-251: LGTM!

Comment thread src/app/endpoints/streaming_query.py
Comment thread src/utils/stream_interrupt.py Outdated
Comment thread src/utils/streaming_sse.py
Comment thread tests/unit/utils/test_stream_interrupt.py Outdated
Comment thread tests/unit/utils/test_stream_interrupt.py Outdated
@asimurka asimurka force-pushed the relocate_streaming_utils branch from a6cd9eb to c417e0d Compare June 12, 2026 06:00
@asimurka asimurka changed the title LCORE-2311: Relocated streaming query utilities LCORE-2311: Relocated stream serialization utilities Jun 12, 2026
@asimurka asimurka marked this pull request as ready for review June 12, 2026 06:42

@tisnik tisnik left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@tisnik tisnik merged commit 273c0eb into lightspeed-core:main Jun 12, 2026
35 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants